This homework is about New York taxi trips. Here is something from Todd Schneider:
The New York City Taxi & Limousine Commission has released a detailed historical dataset covering over 1 billion individual taxi trips in the city from January 2009 through December 2019. Taken as a whole, the detailed trip-level data is more than just a vast list of taxi pickup and drop off coordinates: it's a story of a City. How bad is the rush hour traffic from Midtown to JFK? Where does the Bridge and Tunnel crowd hang out on Saturday nights? What time do investment bankers get to work? How has Uber changed the landscape for taxis? The dataset addresses all of these questions and many more.
The NY taxi trips dataset has been plowed by series of distinguished data scientists. The dataset is available from on Amazon S3 (Amazon's cloud storage service). The link for each file has the following form:
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_{year}-{month}.csv
There is one CSV file for each NY taxi service (yellow, green, fhv) and each calendar month (replacing {year} and {month} by the desired ones).
Each file is moderately large, a few gigabytes.
The full dataset is relatively large if it has to be handled on a laptop (several hundred gigabytes).
You will focus on the yellow taxi service and a pair of months, from year 2015 and from year 2018.
Between those two years, for hire vehicles services have taken off and carved a huge marketshare.
Whatever the framework you use, CSV files prove hard to handle.
After downloading the appropriate files (this takes time, but this is routine), a first step will consist in converting the csv files into a more Spark friendly format such as parquet.
Saving into one of those formats require decisions about bucketing, partitioning and so on. Such decisions influence performance. It is your call. Many people have been working on this dataset, to cite but a few:
Depending on your internet connection, download the files corresponding to "yellow" taxis for the years 2015 and 2018. Download at least one month (the same) for 2015 and 2018, if you can download all of them.
Hint. The 12 csv for 2015 are about 23GB in total, but the corresponding parquet file, if you can create it for all 12 months, is only about 3GB.
You might need the following stuff in order to work with GPS coordinates and to plot things easily.
#!pip install geojson geopandas plotly geopy
#!pip install descartes contextily ipyleaflet
#This is only to output every operation line in each block
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "last"
# import the usual suspects
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import json
import requests
from pathlib import Path
import sys
import timeit
import calendar
#%matplotlib inline
import seaborn as sns
sns.set_context("notebook", font_scale=1.2)
# Plotly
import plotly
import plotly.express as px
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)
import plotly.figure_factory as ff
from plotly.subplots import make_subplots
# Ipyleaflet
import ipyleaflet
from ipyleaflet import Map, basemaps, Heatmap, linear
from random import uniform
#Geopandas
import geopandas
import contextily as ctx
# Spark
import pyspark.sql.functions as fn
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col
from pyspark.sql.catalog import Catalog
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
conf = SparkConf().setAppName("NYC Taxis")
sc = SparkContext(conf=conf)
# Setting the amount of RAM memory for every executor in the Spark session to 12gb
# Normally we dont want to use all available memory in our computer but at least 1gb less than the max we have
# In my case I have 15,5 gb of RAM so using 12gb allows for 3.5gb for the rest of the processes being run
spark = (SparkSession
.builder
.config("spark.executor.memory", "4gb")
.config("spark.driver.memory", "4gb")
.appName("NYC_Analysis")
.getOrCreate())
# We put this, because the first time we create a session the memory parameters are not configured
# So we need to stop and re run the session again so that the values for executor.memory and driver.memory
# appear in the session
spark.stop()
For this homework we will let you decide on the tools to use (expected for Spark) and to find out information all by yourself (but don't hesitate to ask questions on the slack channel).
We want to organize the data on a per year and per service basis.
We want to end up with one parquet file for each year and each taxi service, since parquet is much better than CSV files.
Hint. Depending on your internet connection and your laptop, you can use only the "yellow" service and use one month of 2015 and 2018
CSV files can contain corrupted lines. You may have to work in order to perform ETL (Extract-Transform-Load) in order obtain a properly typed data frame.
You are invited to proceed as follows:
Hint. Don't forget to ask Spark to use all the memory and ressources from your computer.
Hint. Don't foreget that you should specify a partitioning column and a number of partitions when creating the parquet files.
Hint. Note that the schemas of the 2015 and 2018 data are different...
Hint. When working on this, ask you and answer to the following questions:
StorageLevel of the dataframe after reading the csv files?What is the number of partitions of the dataframe?
Is it possible to tune this number at loading time?
# Loading months of June and July for 2015
df_15_06 = spark.read\
.format('csv')\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.option("sep", ",")\
.load("yellow_tripdata_2015-06.csv")
df_15_07 = spark.read\
.format('csv')\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.option("sep", ",")\
.load("yellow_tripdata_2015-07.csv")
df_15_06.printSchema()
df_15_07.printSchema()
# Dataframe union for year 2015 months June and July
df_15_csv = df_15_06.union(df_15_07)
# Rounding the columns for coordinates to be able to make more efficient groupings and filtering of zones
# Adding columns Day Month and Year for partition of parquet files
df_15_csv = df_15_csv\
.withColumn("pickup_longitude", fn.round("pickup_longitude", 3))\
.withColumn("pickup_latitude", fn.round("pickup_latitude",3))\
.withColumn("dropoff_longitude", fn.round("dropoff_longitude",3))\
.withColumn("dropoff_latitude", fn.round("dropoff_latitude",3))\
.withColumn("Day", fn.dayofmonth("tpep_pickup_datetime"))\
.withColumn("Month", fn.month("tpep_pickup_datetime"))\
.withColumn("Year", fn.year("tpep_pickup_datetime"))
df_15_csv.printSchema()
# Loading months of June and July for 2018
df_18_06 = spark.read\
.format('csv')\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.option("sep", ",")\
.load("yellow_tripdata_2018-06.csv")
df_18_07 = spark.read\
.format('csv')\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.option("sep", ",")\
.load("yellow_tripdata_2018-07.csv")
df_18_06.printSchema()
df_18_07.printSchema()
# Dataframe union for year 2018 months June and July
df_18_csv = df_18_06.union(df_18_07)
# Adding columns Day Month and Year for partition of parquet files
df_18_csv = df_18_csv.withColumn("Day", fn.dayofmonth("tpep_pickup_datetime"))\
.withColumn("Month", fn.month("tpep_pickup_datetime"))\
.withColumn("Year", fn.year("tpep_pickup_datetime"))
# Filtering of columns due to corrupt data in year 2018 where there are more months than expected
df_18_csv = df_18_csv.where("(Month = 6 OR Month = 7) AND Year = 2018")
# Creation of parquet file for 2015 yellow taxi data, we partition by month and day
df_15_csv.repartition("Month", "Day").write.partitionBy("Month", "Day").parquet("parquets/yellow_tripdata_2015.parquet")
# Creation of parquet file for 2018 yellow taxi data, we partition by month and day
df_18_csv.repartition("Month", "Day").write.partitionBy("Month", "Day").parquet("parquets/yellow_tripdata_2018.parquet")
From now on, you will be using the parquet files you created for 2015.
We shall visualize several features of taxi traffic during one calendar month in 2015 and the same calendar month in 2018.
Hint. In order to build appealing graphics, you may stick to matplotlib + seaborn, you can use also
plotly, which is used a lot to build interactive graphics, but you can use whatever you want.
df_15_06_pds = pd.read_parquet("parquets/yellow_tripdata_2015.parquet/Month=6")
df_15_06_spark = spark.read.parquet("parquets/yellow_tripdata_2015.parquet/Month=6")
#df_2015_June.select("VendorID", "passenger_count", "tpep_pickup_datetime").show()
The following longitudes and lattitudes encompass Newark and JFK airports, Northern Manhattan and Verazzano bridge.
long_min = -74.10
long_max = -73.70
lat_min = 40.58
lat_max = 40.90
passenger_count and make a plot of that.query1 = f"""
(pickup_longitude BETWEEN {long_min} AND {long_max} AND pickup_latitude BETWEEN {lat_min} AND {lat_max})
AND (dropoff_longitude BETWEEN {long_min} AND {long_max} AND dropoff_latitude BETWEEN {lat_min} AND {lat_max})"""
df_coordinates = df_15_06_spark.where(query1)
filtered_count = df_coordinates.groupBy("passenger_count").count().toPandas()
fig = px.bar(filtered_count, x='passenger_count', y='count', hover_data=['count', 'passenger_count'],
color='count',
labels={'passenger_count':'# of passengers', 'count':'# of trips'}, height=600,
color_continuous_scale=px.colors.sequential.Viridis
)
fig.update_layout(
title="Number of trips for different passenger occupancies",
xaxis_title="Number of passengers",
yaxis_title="Number of trips")
fig.show()
Trips with $0$ or larger than $7$ passengers are pretty rare. We suspect these to be outliers. We need to explore these trips further in order order to understand what might be wrong with them
zero_passenger_df = df_2015_June.where("passenger_count = 0").toPandas()
plus6_passenger_df = df_2015_June.where("passenger_count > 6").toPandas()
They should not exist since a taxi with no passengers is just a car.
zero_passenger_df
They should also not exist since a New York taxi can carry at most 5-6 people.
plus6_passenger_df.head(10)
The largest distance travelled this month is 10,083,318 miles whilst the distance from Earth to the Moon is 238,900 miles. This means that someone travelled more than 21 times there and back to the moon in one trip!
max_distance = df_coordinates.agg({"trip_distance": "max"}).collect()[0][0]
print("Largest distance travelled this month: {:,} miles".format(max_distance))
print("Distance from Earth to the Moon: 238,900 miles")
trip_distance (using an histogram for instance) during year 2015. Focus on trips with non-zero trip distance and trip distance less than 30 miles.distance_travelling = df_15_06_pds[(df_15_06_pds['trip_distance'] > 0) & (df_15_06_pds['trip_distance'] < 30)]
#distance_travelling
fig = plt.figure(figsize=(10,5))
sns.set()
g = sns.distplot(distance_travelling["trip_distance"], color="purple")
g.set_title("Trip distance distribution for distances between 0 and 30 miles")
plt.xlabel("Distance in miles")
plt.ylabel("Proportion")
plt.show(g)
Let's look at what Spark does for these computations
explain method or have a look at the Spark UI to analyze the job. You should be able to assess HashAggregate and Exchange hashpartitioning?shuffle operations? If yes how many?Now, compute the following and produce relevant plots:
distance_travelling["Week_day"] = distance_travelling['tpep_pickup_datetime'].dt.day_name()#
fig, axes = plt.subplots(nrows=7 , ncols=1, figsize=(15, 40))
for i, day in enumerate(calendar.day_name[0:]):
sns.distplot(distance_travelling[distance_travelling["Week_day"] == day]["trip_distance"], ax=axes[i], color="purple")\
.set(xlabel='Trip Distance', ylabel='Proportion', title=f"{day} trip distribution")
fig.subplots_adjust(top=0.9, hspace = .6)
plt.show()
df_coordinates.select("pickup_longitude", "pickup_latitude").distinct().count()
ny_bor = geopandas.read_file(geopandas.datasets.get_path('nybb'))
ny = geopandas.read_file("BoroughBoundaries.geojson")
bronx = ny["geometry"].iloc[0].bounds
staten = ny["geometry"].iloc[1].bounds
brooklyn = ny["geometry"].iloc[2].bounds
queens = ny["geometry"].iloc[3].bounds
manhattan = ny["geometry"].iloc[4].bounds
#Bronx
bronx_long_min = bronx[0]
bronx_long_max = bronx[2]
bronx_lat_min = bronx[1]
bronx_lat_max = bronx[3]
#Staten
staten_long_min = staten[0]
staten_long_max = staten[2]
staten_lat_min = staten[1]
staten_lat_max = staten[3]
#Brooklyn
brooklyn_long_min = brooklyn[0]
brooklyn_long_max = brooklyn[2]
brooklyn_lat_min = brooklyn[1]
brooklyn_lat_max = brooklyn[3]
#Queens
queens_long_min = queens[0]
queens_long_max = queens[2]
queens_lat_min = queens[1]
queens_lat_max = queens[3]
#Manhattan
manhattan_long_min = manhattan[0]
manhattan_long_max = manhattan[2]
manhattan_lat_min = manhattan[1]
manhattan_lat_max = manhattan[3]
querybronx = f"(pickup_longitude BETWEEN {bronx_long_min} AND {bronx_long_max} AND pickup_latitude BETWEEN {bronx_lat_min} AND {bronx_lat_max})"
queryqueens = f"(pickup_longitude BETWEEN {queens_long_min} AND {queens_long_max} AND pickup_latitude BETWEEN {queens_lat_min} AND {queens_lat_max})"
querymanhattan = f"(pickup_longitude BETWEEN {manhattan_long_min} AND {manhattan_long_max} AND pickup_latitude BETWEEN {manhattan_lat_min} AND {manhattan_lat_max})"
querystaten = f"(pickup_longitude BETWEEN {staten_long_min} AND {staten_long_max} AND pickup_latitude BETWEEN {staten_lat_min} AND {staten_lat_max})"
querybrooklyn = f"(pickup_longitude BETWEEN {brooklyn_long_min} AND {brooklyn_long_max} AND pickup_latitude BETWEEN {brooklyn_lat_min} AND {brooklyn_lat_max})"
distinct_profits = df_coordinates.groupBy("pickup_longitude", "pickup_latitude").agg({"total_amount":"sum", "tip_amount":"sum"})
df_bronx = distinct_profits.where(querybronx).withColumn("zone", fn.lit("Bronx"))
df_manhattan = distinct_profits.where(querymanhattan).withColumn("zone", fn.lit("Manhattan"))
df_queens = distinct_profits.where(queryqueens).withColumn("zone", fn.lit("Queens"))
df_brooklyn = distinct_profits.where(querybrooklyn).withColumn("zone", fn.lit("Brooklyn"))
df_staten = distinct_profits.where(querystaten).withColumn("zone", fn.lit("Staten Island"))
boroughs = df_bronx.union(df_manhattan)
boroughs = boroughs.union(df_queens)
boroughs = boroughs.union(df_brooklyn)
boroughs = boroughs.union(df_staten)
boroughs = boroughs.groupBy("zone").agg({"sum(total_amount)":"sum", "sum(tip_amount)":"sum"})
boroughs = boroughs.toPandas()
boroughs = boroughs.rename(columns={"sum(sum(tip_amount))": "tips", "sum(sum(total_amount))": "profits"})
fig = go.Figure()
fig.add_trace(go.Scatter(x=super_union["zone"], y=super_union["tips"],
mode='lines+markers',
name='Tips',
line_shape='spline'))
fig.add_trace(go.Scatter(x=super_union["zone"], y=super_union["profits"],
mode='lines+markers',
name='Profits',
line_shape='spline'))
fig.update_layout(
title="Tips and profits for each NY Borough",
xaxis_title="New York City Boroughs",
yaxis_title="Dollars")
fig.show()
Consider one month of trips data from yellow taxis for each year
df_15_06_spark = spark.read.parquet("parquets/yellow_tripdata_2015.parquet/Month=6")
df_18_06_spark = spark.read.parquet("parquets/yellow_tripdata_2018.parquet/Month=6")
Compute and plot the following time series indexed by day of the week and hour of day:
df_15_06_spark = df_15_06_spark.withColumn("week_day", fn.date_format("tpep_pickup_datetime", "EEEE")).withColumn("Hour", fn.hour("tpep_pickup_datetime"))
df_18_06_spark = df_18_06_spark.withColumn("week_day", fn.date_format("tpep_pickup_datetime", "EEEE")).withColumn("Hour", fn.hour("tpep_pickup_datetime"))
number_pickups_weekday_hour_15 = df_15_06_spark.groupby("week_day", "Hour").count()
number_pickups_weekday_hour_18 = df_18_06_spark.groupby("week_day", "Hour").count()
pickups_df_15 = number_pickups_weekday_hour_15.toPandas()
pickups_df_18 = number_pickups_weekday_hour_18.toPandas()
# This is so the days of the week are shown in the correct order in the plot
pickups_df_15["week_day"] = pd.Categorical(pickups_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
pickups_df_15 = pickups_df_15.sort_values(["week_day","Hour"])
# This is so the days of the week are shown in the correct order in the plot
pickups_df_18["week_day"] = pd.Categorical(pickups_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
pickups_df_18 = pickups_df_18.sort_values(["week_day","Hour"])
fig = px.line(pickups_df_15, x='Hour', y='count', color = "week_day", line_shape='spline',
labels={'count':'# of pickups', 'week_day':'Day '})
fig.update_layout(
title="Number of pickups per hour for every day of the week in June 2015",
xaxis_title="Hour of the day",
yaxis_title="Number of pickups",
legend_title_text='Day of the week'
)
fig.show()
fig = px.line(pickups_df_18, x='Hour', y='count', color = "week_day", line_shape='spline',
labels={'count':'# of pickups', 'week_day':'Day '})
fig.update_layout(
title="Number of pickups per hour for every day of the week in June 2018",
xaxis_title="Hour of the day",
yaxis_title="Number of pickups",
legend_title_text='Day of the week'
)
fig.show()
avg_fare_spark_15 = df_15_06_spark.groupby("week_day", "Hour").agg({"total_amount": "avg"})
avg_fare_spark_18 = df_18_06_spark.groupby("week_day", "Hour").agg({"total_amount": "avg"})
avg_fare_df_15 = avg_fare_spark_15.toPandas()
avg_fare_df_18 = avg_fare_spark_18.toPandas()
# This is so the days of the week are shown in the correct order in the plot
avg_fare_df_15["week_day"] = pd.Categorical(avg_fare_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
avg_fare_df_15 = avg_fare_df_15.sort_values(["week_day","Hour"])
# This is so the days of the week are shown in the correct order in the plot
avg_fare_df_18["week_day"] = pd.Categorical(avg_fare_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
avg_fare_df_18 = avg_fare_df_18.sort_values(["week_day","Hour"])
fig = px.line(avg_fare_df_15, x='Hour', y='avg(total_amount)', color = "week_day", line_shape='spline',
labels={'avg(total_amount)':'avg fare', 'week_day':'Day '})
fig.update_layout(
title="Average fare per hour for every day of the week in June 2015",
xaxis_title="Hour of the day",
yaxis_title="Average fare in dollars",
legend_title_text='Day of the week'
)
fig.show()
fig = px.line(avg_fare_df_18, x='Hour', y='avg(total_amount)', color = "week_day",line_shape='spline',
labels={'avg(total_amount)':'avg fare', 'week_day':'Day '})
fig.update_layout(
title="Average fare per hour for every day of the week in June 2018",
xaxis_title="Hour of the day",
yaxis_title="Average fare in dollars",
legend_title_text='Day of the week'
)
fig.show()
duration_spark_15 = df_15_06_spark.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
duration_spark_18 = df_18_06_spark.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
avg_trip_spark_15 = duration_spark_15.groupby("week_day", "Hour").agg({"duration": "avg"})
avg_trip_spark_18 = duration_spark_18.groupby("week_day", "Hour").agg({"duration": "avg"})
avg_trip_df_15 = avg_trip_spark_15.toPandas()
avg_trip_df_18 = avg_trip_spark_18.toPandas()
# This is so the days of the week are shown in the correct order in the plot
avg_trip_df_15["week_day"] = pd.Categorical(avg_trip_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
avg_trip_df_15 = avg_trip_df_15.sort_values(["week_day","Hour"])
# This is so the days of the week are shown in the correct order in the plot
avg_trip_df_18["week_day"] = pd.Categorical(avg_trip_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
avg_trip_df_18 = avg_trip_df_18.sort_values(["week_day","Hour"])
fig = px.line(avg_trip_df_15, x='Hour', y='avg(duration)', color = "week_day", line_shape='spline',
labels={'avg(duration)':'avg trip duration ', 'week_day':'Day '})
fig.update_layout(
title="Average trip duration per hour for every day of the week in June 2015",
xaxis_title="Hour of the day",
yaxis_title="Average trip duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
fig = px.line(avg_trip_df_18, x='Hour', y='avg(duration)', color = "week_day", line_shape='spline',
labels={'avg(duration)':'avg trip duration ', 'week_day':'Day '})
fig.update_layout(
title="Average trip duration per hour for every day of the week in June 2018",
xaxis_title="Hour of the day",
yaxis_title="Average trip duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
avg_ongoing_trips_15 = df_15_06_spark.groupBy("Day", "Hour").count()
avg_ongoing_trips_18 = df_18_06_spark.groupBy("Day", "Hour").count()
avg_ongoing_trips_15 = avg_ongoing_trips_15.groupBy("Hour").agg({"count":"avg"})
avg_ongoing_trips_18 = avg_ongoing_trips_18.groupBy("Hour").agg({"count":"avg"})
avg_ongoing_trips_15 = avg_ongoing_trips_15.toPandas()
avg_ongoing_trips_18 = avg_ongoing_trips_18.toPandas()
avg_ongoing_trips_15 = avg_ongoing_trips_15.sort_values(by='Hour', ascending=True)
avg_ongoing_trips_18 = avg_ongoing_trips_18.sort_values(by='Hour', ascending=True)
fig = go.Figure()
fig.add_trace(go.Scatter(x=avg_ongoing_trips_15["Hour"], y=avg_ongoing_trips_15["avg(count)"],
mode='lines+markers',
name='2015',
line_shape='spline'))
fig.add_trace(go.Scatter(x=avg_ongoing_trips_18["Hour"], y=avg_ongoing_trips_18["avg(count)"],
mode='lines+markers',
name='2018',
line_shape='spline'))
fig.update_layout(
title="Average number of ongoing trips per hour in June",
legend_title_text='Year',
xaxis_title="Hour of the day",
yaxis_title="Number of trips")
fig.show()
In order to find the longitude and lattitude of JFK and Newark airport as well as the longitude and magnitudes of Manhattan, you can use a service like geojson.io. Plot the following time series, indexed the day of the week and hour of the day
#Exact JFK coordinates
JFK_long = -73.78606796264648
JFK_lat = 40.64274482191706
#JFK coordinates
jfk_long_min = -73.83
jfk_long_max = -73.74
jfk_lat_min = 40.62
jfk_lat_max = 40.67
jfk_id = 132
# Midtown coordinates
midtown_long_min = -74.027
midtown_long_max = -73.95
midtown_lat_min = 40.725
midtown_lat_max = 40.77
midtown_ids = (224, 164, 107, 90, 246, 68, 48, 163, 162, 229, 50, 230)
# Function defined to calculate the median of a list of values
def median(values_list):
med = np.median(values_list)
return float(med)
udf_median = fn.udf(median, FloatType())
queryMid_JFK_15 = f"(pickup_longitude BETWEEN {midtown_long_min} AND {midtown_long_max} AND pickup_latitude BETWEEN {midtown_lat_min} AND {midtown_lat_max}) AND (dropoff_longitude BETWEEN {jfk_long_min} AND {jfk_long_max} AND dropoff_latitude BETWEEN {jfk_lat_min} AND {jfk_lat_max})"
midtown_jfk_df_15 = df_15_06_spark.where(queryMid_JFK_15)
queryMid_JFK_18 = f"PULocationID IN {midtown_ids} AND DOLocationID = {jfk_id}"
midtown_jfk_df_18 = df_18_06_spark.where(queryMid_JFK_18)
midtown_jfk_df_15 = midtown_jfk_df_15.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
midtown_jfk_df_18 = midtown_jfk_df_18.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
median_trip_MID_JFK_spark_15 = midtown_jfk_df_15.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
median_trip_MID_JFK_spark_18 = midtown_jfk_df_18.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
med_trip_MID_JFK_df_15 = median_trip_MID_JFK_spark_15.toPandas()
med_trip_MID_JFK_df_18 = median_trip_MID_JFK_spark_18.toPandas()
# This is so the days of the week are shown in the correct order in the plot
med_trip_MID_JFK_df_15["week_day"] = pd.Categorical(med_trip_MID_JFK_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
med_trip_MID_JFK_df_15 = med_trip_MID_JFK_df_15.sort_values(["week_day","Hour"])
# This is so the days of the week are shown in the correct order in the plot
med_trip_MID_JFK_df_18["week_day"] = pd.Categorical(med_trip_MID_JFK_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
med_trip_MID_JFK_df_18 = med_trip_MID_JFK_df_18.sort_values(["week_day","Hour"])
fig = px.line(med_trip_MID_JFK_df_15, x='Hour', y='median_duration', color = "week_day",line_shape='spline',
labels={'median_duration':'median trip duration', 'week_day':'Day '})
fig.update_layout(
title="Median trip duration from Midtown to JFK airport per day per hour | June 2015",
xaxis_title="Hour of the day",
yaxis_title="Median duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
fig = px.line(med_trip_MID_JFK_df_18, x='Hour', y='median_duration', color = "week_day",line_shape='spline',
labels={'median_duration':'median trip duration', 'week_day':'Day '})
fig.update_layout(
title="Median trip duration from Midtown to JFK airport per day per hour | June 2018",
xaxis_title="Hour of the day",
yaxis_title="Median duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
queryJFK_MID_15 = f"(pickup_longitude BETWEEN {jfk_long_min} AND {jfk_long_max} AND pickup_latitude BETWEEN {jfk_lat_min} AND {jfk_lat_max}) AND (dropoff_longitude BETWEEN {midtown_long_min} AND {midtown_long_max} AND dropoff_latitude BETWEEN {midtown_lat_min} AND {midtown_lat_max})"
jfk_midtown_df_15= df_15_06_spark.where(queryJFK_MID_15)
queryJFK_MID_18 = f"PULocationID = {jfk_id} AND DOLocationID IN {midtown_ids}"
jfk_midtown_df_18 = df_18_06_spark.where(queryJFK_MID_18)
jfk_midtown_df_15 = jfk_midtown_df_15.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
jfk_midtown_df_18 = jfk_midtown_df_18.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
median_trip_JFK_MID_spark_15 = jfk_midtown_df_15.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
median_trip_JFK_MID_spark_18 = jfk_midtown_df_18.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
median_trip_JFK_MID_df_15 = median_trip_JFK_MID_spark_15.toPandas()
median_trip_JFK_MID_df_18 = median_trip_JFK_MID_spark_18.toPandas()
median_trip_JFK_MID_df_15["week_day"] = pd.Categorical(median_trip_JFK_MID_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
median_trip_JFK_MID_df_15 = median_trip_JFK_MID_df_15.sort_values(["week_day","Hour"])
median_trip_JFK_MID_df_18["week_day"] = pd.Categorical(median_trip_JFK_MID_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
median_trip_JFK_MID_df_18 = median_trip_JFK_MID_df_18.sort_values(["week_day","Hour"])
fig = px.line(median_trip_JFK_MID_df_15, x='Hour', y='median_duration', color = "week_day",line_shape='spline',
labels={'median_duration':'median trip duration', 'week_day':'Day '})
fig.update_layout(
title="Median trip duration from JFK airport to Midtown per day per hour | June 2015",
xaxis_title="Hour of the day",
yaxis_title="Median duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
fig = px.line(median_trip_JFK_MID_df_18, x='Hour', y='median_duration', color = "week_day", line_shape='spline',
labels={'median_duration':'median trip duration', 'week_day':'Day '})
fig.update_layout(
title="Median trip duration from JFK airport to Midtown per day per hour | June 2018",
xaxis_title="Hour of the day",
yaxis_title="Median duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
For this, you will need to find tools to display maps and to build choropleth maps. We let you look and find relevant tools to do this.
A. number of pickups
total_pickups_15_06 = df_15_06_spark.groupBy("pickup_latitude", "pickup_longitude").count()
total_pickups_15_06_df = total_pickups_15_06.toPandas()
#total_pickups_2015_06_df
locationsA = total_pickups_15_06_df.values.tolist()
mapA = Map(center=(40.67, -73.94), zoom=8)
heatmap = Heatmap(locations=locationsA,radius=8, blur =15)
mapA.add_layer(heatmap);
mapA
B. number of dropoffs
total_dropoffs_15_06 = df_15_06_spark.groupBy("dropoff_latitude", "dropoff_longitude").count()
total_dropoffs_15_06.count()
total_dropoffs_15_06_df = total_dropoffs_15_06.toPandas()
#total_dropoffs_2015_06_df
locationsB = total_dropoffs_15_06_df.values.tolist()
mapB = Map(center=(40.67, -73.94), zoom=8)
heatmap = Heatmap(locations=locationsB,radius=8, blur =15)
mapB.add_layer(heatmap);
mapB
C. number of pickups with dropoff at some airport (JFK, LaGuardia, Newark)
#Newark coordinates
newark_long_min =-74.202
newark_long_max = -74.145
newark_lat_min = 40.66
newark_lat_max = 40.711
#LaGuardia coordinates
laguardia_long_min = -73.89
laguardia_long_max = -73.85
laguardia_lat_min = 40.764
laguardia_lat_max = 40.785
airport_dropoff_query = f"""
(dropoff_longitude BETWEEN {jfk_long_min} AND {jfk_long_max} AND dropoff_latitude BETWEEN {jfk_lat_min} AND {jfk_lat_max})
OR
(dropoff_longitude BETWEEN {newark_long_min} AND {newark_long_max} AND dropoff_latitude BETWEEN {newark_lat_min} AND {newark_lat_max})
OR
(dropoff_longitude BETWEEN {laguardia_long_min} AND {laguardia_long_max} AND dropoff_latitude BETWEEN {laguardia_lat_min} AND {laguardia_lat_max})
"""
airport_dropoffs_df = df_15_06_spark.where(airport_dropoff_query)
#airport_dropoffs_df.count()
pickups_to_airport = airport_dropoffs_df.groupBy("pickup_latitude", "pickup_longitude").count()
#pickups_to_airport.count()
pickups_to_airport_df = pickups_to_airport.toPandas()
#pickups_to_airport_df
locationsC = pickups_to_airport_df.values.tolist()
mapC = Map(center=(40.67, -73.94), zoom=8)
heatmap = Heatmap(locations=locationsC,radius=8, blur =15)
mapC.add_layer(heatmap);
mapC
pdf_18 = spark.read.parquet("parquets/yellow_tripdata_2018.parquet")
zones = geopandas.read_file("NYCTaxiZones.geojson")
zones.head()
zones_info = zones[["objectid", "zone"]]
zones_info["objectid"] = zones_info["objectid"].astype(int)
A. number of pickups in the area
pickups_18 = pdf_18.groupBy("PULocationID").count()
pickups_18_df = pickups_18.toPandas()
pickups_18_df = pd.merge(pickups_18_df, zones_info, how='left', left_on='PULocationID', right_on='objectid')
pickups_18_df = pickups_18_df[["PULocationID", "count", "zone"]]
pickups_18_df.head()
fig = px.choropleth(pickups_18_df, geojson=zones,
locations='PULocationID', color='count',
hover_name = 'zone',
labels={'PULocationID':'Taxi zone', 'count': 'Pickups'}
)
fig.update_geos(fitbounds="locations", visible=True)
fig.update_layout(title_text = 'Number of pickups per NYC Taxi area')
fig.show()
B. ratio of number of payments by card/number of cash payments for pickups in the area
payment_ratio_18 = pdf_18.groupBy("PULocationID","payment_type").count()
t1 = payment_ratio_18.where("payment_type = 1").select("PULocationID", fn.col("count").alias("t1"))
t2 = payment_ratio_18.where("payment_type = 2").select("PULocationID", fn.col("count").alias("t2"))
payment_types = t1.join(t2, "PULocationID")
payment_types = payment_types.withColumn("ratio", fn.round(fn.col("t2")/fn.col("t1"), 3))
#payment_types.show()
payment_ratio = payment_types.toPandas()
payment_ratio = pd.merge(payment_ratio, zones_info, how='left', left_on='PULocationID', right_on='objectid')
payment_ratio = payment_ratio[["PULocationID", "ratio", "zone"]]
payment_ratio.head()
fig = px.choropleth(payment_ratio, geojson=zones,
locations='PULocationID', color='ratio',
hover_name = 'zone',
labels={'PULocationID':'Taxi zone', 'ratio': 'Card/Cash ratio'}
)
fig.update_geos(fitbounds="locations", visible=True)
fig.update_layout(title_text = 'Card/Cash payment ratio per NYC Taxi area')
fig.show()
C. ratio of total fare/trip duration for dropoff in the area
pdf_18 = pdf_18.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
doffs_total_fare_18 = pdf_18.groupBy("DOLocationID",).agg({"total_amount":"sum"})
doffs_total_duration_18 = pdf_18.groupBy("DOLocationID",).agg({"duration":"sum"})
ratio_fare_duration = doffs_total_fare_18.join(doffs_total_duration_18, "DOLocationID")
ratio_fare_duration = ratio_fare_duration.withColumn("ratio", fn.col("sum(total_amount)")/fn.col("sum(duration)"))
ratio_fare_duration = ratio_fare_duration.toPandas()
ratio_fare_duration = pd.merge(ratio_fare_duration, zones_info, how='left', left_on='DOLocationID', right_on='objectid')
ratio_fare_duration = ratio_fare_duration[["DOLocationID", "ratio", "zone"]]
ratio_fare_duration.head()
fig = px.choropleth(ratio_fare_duration, geojson=zones,
locations='DOLocationID', color='ratio',
hover_name = 'zone',
labels={'DOLocationID':'Taxi zone', 'ratio': 'Fare/Duration ratio'}
)
fig.update_geos(fitbounds="locations", visible=True)
fig.update_layout(title_text = 'Total fare-Trip duration ratio per dropoff NYC Taxi area')
fig.show()
hour of day and where the color is a function of question A and question Bpdf_18 = pdf_18.withColumn("Hour", fn.hour("tpep_dropoff_datetime"))
with open('NYCTaxiZones.geojson') as json_file:
jsdata = json.load(json_file)
data = dict(
type='choropleth', # type of map-plot
colorscale = 'Viridis',
autocolorscale = False,
locations = testing['DOLocationID'], # the column with the state
z = testing['ratio'], # the variable I want to color-code
geojson = jsdata,
featureidkey = "properties.objectid",
colorbar = dict(title = "Ratio"))
layout = dict(title = "Ratio values", geo=dict(fitbounds="locations", visible=True))
fig = dict( data=data, layout=layout )
plotly.offline.iplot(fig)
A. average number of dropoffs in the area during that hour of the day
avg_doffs_hour_18 = pdf_18.groupBy("DOLocationID","Hour").count()
#avg_doffs_hour_18.show()
slider3A_df = avg_doffs_hour_18.withColumn("average", fn.col("count")/61)
slider3A_df = slider3A_df.select("DOLocationID", "Hour", "average").toPandas()
slider3A_df.head()
# Create empty list for data objects:
data_sliderA = []
for hour in slider3A_df.Hour.unique():
# Filter the df by hour
df_hourly = slider3A_df[(slider3A_df['Hour'] == hour )]
# Create the dictionary with the data for the current hour
hourly_data = dict(
type='choropleth', # type of map-plot
colorscale = 'Viridis',
locations = df_hourly['DOLocationID'], # the column with the state
z = df_hourly['average'], # the variable I want to color-code
geojson = jsdata,
featureidkey = "properties.objectid")
data_sliderA.append(hourly_data) # I add the dictionary to the list of dictionaries for the slider
# Steps for the slider
stepsA = []
for i in range(len(data_slider)):
step = dict(method='restyle',
args=['visible', [False] * len(data_slider)],
label='Hour {}'.format(i)) # label to be displayed for each step (year)
step['args'][1][i] = True
stepsA.append(step)
# I create the 'sliders' object from the 'steps'
slidersA = [dict(active=0, pad={"t": 1}, steps=steps)]
# I set up the layout (including slider option)
layoutA = dict(geo=dict(fitbounds="locations", visible=True), sliders=slidersA)
# I create the figure object:
figA = dict(data=data_sliderA, layout=layoutA)
# to plot in the notebook
plotly.offline.plot(figA)
#This will output an HTML file that can be opened in the browser, because opening it in the notebook is too heavy
#plotly.offline.plot(fig, auto_open=True, image = 'png', image_filename="avg_number_dropoffs" ,image_width=2000, image_height=1000,
# filename='sliderA.html', validate=True)
B. average ratio of tip over total fare amount for pickups in the area at given hour of the day
ratio_tip_fare = pdf_18.groupBy("DOLocationID","Hour").agg({"total_amount":"sum", "tip_amount":"sum"})
slider3B_df = ratio_tip_fare.withColumn("ratio", fn.col("sum(tip_amount)")/fn.col("sum(total_amount)"))
slider3B_df = slider3B_df.select("DOLocationID", "Hour", "ratio").toPandas()
slider3B_df.head()
# Create empty list for data object:
data_sliderB = []
for hour in slider3B_df.Hour.unique():
# I select the hour
df_hourly = slider3B_df[(slider3B_df['Hour'] == hour )]
# Create the dictionary with the data for the current hour
hourly_data = dict(
type='choropleth', # type of map-plot
colorscale = 'Viridis',
autocolorscale = False,
locations = df_hourly['DOLocationID'], # the column with the state
z = df_hourly['ratio'], # the variable I want to color-code
geojson = jsdata,
featureidkey = "properties.objectid")
data_sliderB.append(hourly_data) # I add the dictionary to the list of dictionaries for the slider
## I create the steps for the slider
stepsB = []
for i in range(len(data_sliderB)):
step = dict(method='restyle',
args=['visible', [False] * len(data_slider)],
label='Hour {}'.format(i)) # label to be displayed for each step (year)
step['args'][1][i] = True
stepsB.append(step)
## I create the 'sliders' object from the 'steps'
slidersB = [dict(active=0, pad={"t": 1}, steps=stepsB)]
layoutB = dict(geo=dict(fitbounds="locations", visible=True), sliders=slidersB)
# I create the figure object:
figB = dict(data=data_slider, layout=layoutB)
# Plot in the notebook
#plotly.offline.iplot(fig)
#This will output an HTML file that can be opened in the browser, because opening it in the notebook is too heavy
plotly.offline.plot(fig, auto_open=True, image = 'png', image_filename="ratio_tip_fare" ,image_width=2000, image_height=1000,
filename='sliderB.html', validate=True)
spark.stop()